/**
* Copyright © 2018-2019, by 晓叹星沉.
*/
package org.aurora.mq.config;

import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.aurora.mq.annotation.ConsumeMode;
import org.aurora.mq.annotation.EnableRocketMQConfiguration;
import org.aurora.mq.annotation.RocketMQConsumer;
import org.aurora.mq.annotation.RocketMQTransactionProducer;
import org.aurora.mq.core.AbstractRocketMQPushConsumer;
import org.aurora.mq.core.AbstractRocketMQTransactionProducer;
import org.aurora.mq.core.DefaultRocketMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/**
 * 
 * <p>
 * RocketMQ自动配置类
 * </p>
 * @author 晓叹星沉
 * @since jdk1.8
 * 2019年7月9日
 *  
 */
@ConditionalOnBean(annotation = EnableRocketMQConfiguration.class)
@EnableConfigurationProperties(RocketMQProperties.class)
public class RocketMQAutoConfiguration implements ApplicationContextAware{

	private static Logger logger = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);
	
	private ApplicationContext applicationContext;
	
	private Map<String, String> consumerMap = new ConcurrentHashMap<String, String>();
	
	private Map<String, DefaultMQPushConsumer> consumerCache = new ConcurrentHashMap<String, DefaultMQPushConsumer>();
	
	private Map<String, TransactionMQProducer> transactionProducerCache = new ConcurrentHashMap<String, TransactionMQProducer>();
	
	@Autowired
	private RocketMQProperties properties;
	
	@Bean(destroyMethod = "shutdown")
	@ConditionalOnMissingBean
	public DefaultRocketMQProducer createDefaultRocketMQProducer() throws Exception{
		DefaultMQProducer producer = new DefaultMQProducer(properties.getProducerGroup());
        producer.setNamesrvAddr(properties.getNameServerAddress());
        producer.setSendMsgTimeout(properties.getSendTimeout());
        producer.setSendMessageWithVIPChannel(properties.getVipChannelEnabled());
        producer.setProducerGroup(properties.getProducerGroup());
        producer.start();
        DefaultRocketMQProducer defaultRocketMQProducer = new DefaultRocketMQProducer(producer);
		return defaultRocketMQProducer;
	}
	
	@PostConstruct
	public void createConsumer() throws Exception{
		Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RocketMQConsumer.class);
		for (Map.Entry<String, Object> entry : beans.entrySet()) {
			String beanName = entry.getKey();
			Object bean = entry.getValue();
			createConsumer(beanName, bean);
        }
	}

	/**
	 * 创建consumer
	 * @param beanName
	 * @param bean
	 * @throws MQClientException
	 */
	private void createConsumer(String beanName, Object bean) throws MQClientException {
		RocketMQConsumer rocketMQConsumer = applicationContext.findAnnotationOnBean(beanName, RocketMQConsumer.class);
		
		Assert.hasLength(rocketMQConsumer.consumerGroup(), String.format("%s's consumerGroup must be defined", beanName));
		Assert.hasLength(rocketMQConsumer.topic(), String.format("%s's topic must be defined", beanName));
		
		if (!AbstractRocketMQPushConsumer.class.isAssignableFrom(bean.getClass())) {
		    throw new RuntimeException(bean.getClass().getName() + "未实现AbstractRocketMQPushConsumer抽象类");
		}
		
		Environment environment = applicationContext.getEnvironment();
		
		String consumerGroup = environment.resolvePlaceholders(rocketMQConsumer.consumerGroup());
		String topic = environment.resolvePlaceholders(rocketMQConsumer.topic());
		String tags = "*";
		if(rocketMQConsumer.tag().length == 1) {
		    tags = environment.resolvePlaceholders(rocketMQConsumer.tag()[0]);
		} else if(rocketMQConsumer.tag().length > 1) {
		    tags = StringUtils.join(rocketMQConsumer.tag(), "||");
		}
		
		String subTopic = topic + "#" + tags;

		// 检查consumerGroup
		//暂定一个应用内对于指定consumerGroup只允许一个消费者
		if(!StringUtils.isEmpty(consumerMap.get(consumerGroup + "#" + subTopic))) {
			String exist = consumerMap.get(consumerGroup + "#" + subTopic);
			throw new RuntimeException("消费组: " + consumerGroup + "已经订阅了" + exist);
		} else {
			consumerMap.put(consumerGroup + "#" + subTopic, subTopic);
		}
		
		if (AbstractRocketMQPushConsumer.class.isAssignableFrom(bean.getClass())) {
			DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
			consumer.setNamesrvAddr(properties.getNameServerAddress());
		    consumer.setMessageModel(rocketMQConsumer.messageMode());
		    consumer.subscribe(topic, tags);
		    consumer.setVipChannelEnabled(properties.getVipChannelEnabled());
		    consumer.setConsumeFromWhere(rocketMQConsumer.consumeFromWhere());
		    AbstractRocketMQPushConsumer abstractMQPushConsumer = (AbstractRocketMQPushConsumer) bean;
		    if(ConsumeMode.CONCURRENTLY.equals(rocketMQConsumer.consumeMode())){//并发
		    	consumer.registerMessageListener((MessageListenerConcurrently)(list, context) -> {
		    		return abstractMQPushConsumer.dealMessage(list, context);
		    	});
		    }else{//顺序
		    	consumer.registerMessageListener((MessageListenerOrderly)(list, context) -> {
		    		return abstractMQPushConsumer.dealMessage(list, context);
		    	});
		    }
		    consumer.start();
		    consumerCache.put(bean.getClass().getName(), consumer);
		    logger.info(String.format("%s is ready to subscribe topic[%s]", bean.getClass().getName(), subTopic));
		}
	}
	
	@PostConstruct
    public void configTransactionProducer() {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RocketMQTransactionProducer.class);
        if(CollectionUtils.isEmpty(beans)){
            return;
        }
        ExecutorService executorService = new ThreadPoolExecutor(beans.size(), beans.size()*2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        Environment environment = applicationContext.getEnvironment();
        String producerGroup = StringUtils.isBlank(properties.getProducerGroup()) ? "defaultProducerGroup" : properties.getProducerGroup();
        beans.entrySet().forEach( transactionProducer -> {
            try {
                AbstractRocketMQTransactionProducer beanObj = AbstractRocketMQTransactionProducer.class.cast(transactionProducer.getValue());
                RocketMQTransactionProducer anno = beanObj.getClass().getAnnotation(RocketMQTransactionProducer.class);
                String innerProducerGroup = environment.resolvePlaceholders(anno.producerGroup()); 
                if(StringUtils.isBlank(innerProducerGroup)) innerProducerGroup = producerGroup;
                TransactionMQProducer producer = new TransactionMQProducer(innerProducerGroup);
                producer.setNamesrvAddr(properties.getNameServerAddress());
                producer.setExecutorService(executorService);
                producer.setTransactionListener(beanObj);
                producer.start();
                beanObj.setTransactionProducer(producer);
                transactionProducerCache.put(beanObj.getClass().getName(), producer);
            } catch (Exception e) {
            	logger.error("build transaction producer error : {}", e);
            }
        });
    }

	@PreDestroy
	public void destroy(){
		//consumer destroy
		for (Map.Entry<String, DefaultMQPushConsumer> entry : consumerCache.entrySet()) {
			entry.getValue().shutdown();
			logger.info(String.format("%s's consumer has shutdown", entry.getKey()));
		}
		
		//transaction producer destroy
		for (Map.Entry<String, TransactionMQProducer> entry : transactionProducerCache.entrySet()) {
			entry.getValue().shutdown();
			logger.info(String.format("%s's transaction producer has shutdown", entry.getKey()));
		}
	}
	
	/** 
	  * {@inheritDoc}   
	  * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) 
	  */
	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}
	
	
}
